/*
 * Copyright 2013 LinkedIn Corp. All rights reserved
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.linkedin.databus.client.pub.mbean;


import java.io.IOException;
import java.io.OutputStream;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.locks.Lock;

import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;

import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.math3.stat.StatUtils;

import com.codahale.metrics.MergeableExponentiallyDecayingReservoir;

// auto-generated by Avro from UnifiedClientStatsEvent.*.avsc:
import com.linkedin.databus.client.pub.monitoring.events.UnifiedClientStatsEvent;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusConstants;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.monitoring.mbean.AbstractMonitoringMBean;
import com.linkedin.databus.core.monitoring.mbean.DatabusMonitoringMBean;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectorMergeable;

/**
 * Joint relay/bootstrap client-library metrics for Databus v2 and v3 consumers.  At the lowest
 * level, these stats may represent a single table (in a v2 multi-tenant setup) or a single
 * partition for a single table (in a client load-balancing [CLB] setup); in the latter case,
 * they may not even be exposed.  The same class is also used to aggregate stats over multiple
 * partitions and/or tables.  Thus in the CLB case, for example, the lowest level is one partition
 * of a table; the second level is aggregated across all partitions for a single table; and the
 * third level is aggregated across all tables [for a single DB or for all subscriptions? TODO/FIXME].
 *
 * This class is intended to supersede ConsumerCallbackStats for consumers.
 */
public class UnifiedClientStats extends AbstractMonitoringMBean<UnifiedClientStatsEvent>
                                implements UnifiedClientStatsMBean,
                                           StatsCollectorMergeable<UnifiedClientStats>
{
  // default threshold for lack of data events to be considered "idle":
  public static final int DEFAULT_DEADNESS_THRESHOLD_MS = 300000;

  private final String _name;
  private final String _dimension;  // used in MBean/JMX naming to distinguish similar collectors; see mbeanProps below
  private final MBeanServer _mbeanServer;
  private final MergeableExponentiallyDecayingReservoir _reservoirTimeLagConsumerCallbacksMs;

  private MergeableExponentiallyDecayingReservoir _reservoirTimeLagSourceToReceiptMs;
  private long _deadnessThresholdMs;
  private boolean _isBootstrapping = false;

  // used only by tests:
  public UnifiedClientStats(int ownerId, String name, String dimension)
  {
    this(ownerId, name, dimension, true, false, DEFAULT_DEADNESS_THRESHOLD_MS, null, null);
  }

  public UnifiedClientStats(int ownerId, String name, String dimension, boolean enabled,
                            boolean threadSafe, long deadnessThresholdMs, UnifiedClientStatsEvent initData)
  {
    this(ownerId, name, dimension, enabled, threadSafe, deadnessThresholdMs, initData, null);
  }

  public UnifiedClientStats(int ownerId, String name, String dimension, boolean enabled,
                            boolean threadSafe, long deadnessThresholdMs, UnifiedClientStatsEvent initData,
                            MBeanServer server)
  {
    super(enabled, threadSafe, initData);
    _event.ownerId = ownerId;
    _name = name;
    _dimension = dimension;
    _deadnessThresholdMs = deadnessThresholdMs;
    _mbeanServer = server;
    _reservoirTimeLagSourceToReceiptMs = new MergeableExponentiallyDecayingReservoir();
    _reservoirTimeLagConsumerCallbacksMs = new MergeableExponentiallyDecayingReservoir();
    resetData();
    registerAsMbean();
  }

  public void setBootstrappingState(boolean isBootstrapping)
  {
    if (!_enabled.get()) return;
    Lock writeLock = acquireWriteLock();
    try
    {
      // timeLagSourceToReceiptMs is defined to be -1 when bootstrapping, which means there's no need even to
      // collect the data.  If codahale's Reservoir interface supported a clear() method, we could just use that...
      if (!_isBootstrapping && isBootstrapping)
      {
        // online-consumption -> bootstrap transition:  nuke _reservoirTimeLagSourceToReceiptMs
        _reservoirTimeLagSourceToReceiptMs = null;
      }
      else if (_isBootstrapping && !isBootstrapping)
      {
        // bootstrap -> online-consumption transition:  resurrect _reservoirTimeLagSourceToReceiptMs
        _reservoirTimeLagSourceToReceiptMs = new MergeableExponentiallyDecayingReservoir();
      }
      _isBootstrapping = isBootstrapping;
      _event.curBootstrappingPartitions = isBootstrapping ? 1 : 0;
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

  public void setHeartbeatTimestamp(long heartbeatTimestamp)
  {
    if (!_enabled.get()) return;
    Lock writeLock = acquireWriteLock();
    try
    {
      _event.timestampOfLastHeartbeatMs = heartbeatTimestamp;
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

  // used only for tests (=> no need for corresponding getter)
  public void setDeadnessThresholdMs(long deadnessThresholdMs)
  {
    _deadnessThresholdMs = deadnessThresholdMs;
  }

  public String getDimension()
  {
    return _dimension;
  }

  public String getName()
  {
    return _name;
  }

  public MergeableExponentiallyDecayingReservoir getReservoirTimeLagSourceToReceiptMs()
  {
    return _reservoirTimeLagSourceToReceiptMs;
  }

  public MergeableExponentiallyDecayingReservoir getReservoirTimeLagConsumerCallbacksMs()
  {
    return _reservoirTimeLagConsumerCallbacksMs;
  }

  public void registerAsMbean()
  {
    super.registerAsMbean(_mbeanServer);
  }

  public void unregisterAsMbean()
  {
    super.unregisterMbean(_mbeanServer);
  }

  @Override
  public JsonEncoder createJsonEncoder(OutputStream out) throws IOException
  {
    return new JsonEncoder(_event.getSchema(), out);
  }

  @Override
  public ObjectName generateObjectName() throws MalformedObjectNameException
  {
    Hashtable<String, String> mbeanProps = generateBaseMBeanProps();
    mbeanProps.put("ownerId", Integer.toString(_event.ownerId));
    mbeanProps.put("dimension", _dimension);
    return new ObjectName(AbstractMonitoringMBean.JMX_DOMAIN, mbeanProps);
  }

  // called by ctor above (no lock required) and by reset() in superclass (which acquires write lock)
  @Override
  protected void resetData()
  {
    long now = System.currentTimeMillis();

    _event.timestampLastResetMs = now;
    _event.aggregated = false;
    _event.curBootstrappingPartitions = 0;
    _event.curDeadConnections = 0;
    _event.numConsumerErrors = 0;
    _event.numDataEvents = 0;
    _event.timestampOfLastHeartbeatMs = now;
    _event.timestampLastDataEventWasReceivedMs = 0;  // timestamp values are always stored in ms
  }

  // called only by getStatistics() in superclass (which acquires read lock)
  @Override
  protected void cloneData(UnifiedClientStatsEvent event)
  {
    // ConsumerCallbackStats doesn't do this... // event.timestampLastResetMs               = _event.timestampLastResetMs;
    event.aggregated                         = _event.aggregated;
    event.curBootstrappingPartitions         = _event.curBootstrappingPartitions;
    event.curDeadConnections                 = _event.curDeadConnections;
    event.numConsumerErrors                  = _event.numConsumerErrors;
    event.numDataEvents                      = _event.numDataEvents;
    event.timestampOfLastHeartbeatMs           = _event.timestampOfLastHeartbeatMs;
    event.timestampLastDataEventWasReceivedMs  = _event.timestampLastDataEventWasReceivedMs;
  }

  @Override
  protected UnifiedClientStatsEvent newDataEvent()
  {
    return new UnifiedClientStatsEvent();
  }

  @Override
  protected SpecificDatumWriter<UnifiedClientStatsEvent> getAvroWriter()
  {
    return new SpecificDatumWriter<UnifiedClientStatsEvent>(UnifiedClientStatsEvent.class);
  }

  // called by superclass's (AbstractMonitoringMBean's) mergeStats() and by
  // StatsCollectors.mergeStatsCollectors() -> resetAndMerge() -> merge(); callers
  // handle locking
  @Override
  protected void doMergeStats(Object eventData)
  {
    if (!(eventData instanceof UnifiedClientStats))
    {
      LOG.warn("Attempt to merge stats from unknown event class: " + eventData.getClass().getName());
      return;
    }
    UnifiedClientStats otherEvent = (UnifiedClientStats)eventData;
    UnifiedClientStatsEvent e = otherEvent._event;

    _event.aggregated = true;

    // standalone metrics; aggregation = simple sum:
    _event.curBootstrappingPartitions  += e.curBootstrappingPartitions;
    _event.numConsumerErrors           += e.numConsumerErrors;
    _event.numDataEvents               += e.numDataEvents;

    // special-case, half-standalone/half-derived metric:  aggregation is slightly complicated...
    if (e.aggregated)
    {
      // we're the third (or higher) level up, so it's safe to trust the lower level's count; just add it in:
      _event.curDeadConnections += e.curDeadConnections;
    }
    else if (System.currentTimeMillis() - e.timestampOfLastHeartbeatMs > _deadnessThresholdMs)
    {
      // we're the second level (first level of aggregation), so we need to check the first level's timestamp
      // to see if its connection is dead
      ++_event.curDeadConnections;
    }

    // support metrics for timeLagLastReceivedToNowMs; since want worst case across aggregated time lags
    // (i.e., maximum interval), want _minimum_ (oldest) non-zero timestamp:
    if (_event.timestampLastDataEventWasReceivedMs == 0)
    {
      // other one is same or better, so assign unconditionally
      _event.timestampLastDataEventWasReceivedMs = e.timestampLastDataEventWasReceivedMs;
    }
    else if (e.timestampLastDataEventWasReceivedMs > 0)
    {
      // both are non-zero, so assign minimum
      _event.timestampLastDataEventWasReceivedMs =
          Math.min(_event.timestampLastDataEventWasReceivedMs, e.timestampLastDataEventWasReceivedMs);
    }
    // else e.timestampLastDataEventWasReceivedMs == 0, so ignore it

    // Support for timeLagSourceToReceiptMs histogram metrics, which uses exponentially weighted statistical
    // sampling; aggregation is tricky (see MergeableExponentiallyDecayingReservoir for details).  Special
    // cases:  (1) if otherEvent is bootstrapping, its getReservoirTimeLagSourceToReceiptMs() will be null, so
    // merge() will return immediately; (2) if no data events yet received, e.timestampLastDataEventWasReceivedMs
    // will be zero, so skip merge in that case.
    if (e.timestampLastDataEventWasReceivedMs > 0)
    {
      // we're an aggregate, so _reservoirTimeLagSourceToReceiptMs should never be null (aggregates can't bootstrap)
      _reservoirTimeLagSourceToReceiptMs.merge(otherEvent.getReservoirTimeLagSourceToReceiptMs());
    }

    // Support for timeLagConsumerCallbacksMs histogram metrics, which uses exponentially weighted statistical
    // sampling; aggregation is tricky (see MergeableExponentiallyDecayingReservoir for details).  No special
    // handling for bootstrap mode is needed.  If no callbacks have occurred, reservoirs will be empty, and all
    // percentile values will be zero (see getTimeLagConsumerCallbacksMs_HistPct() below).
    _reservoirTimeLagConsumerCallbacksMs.merge(otherEvent.getReservoirTimeLagConsumerCallbacksMs());
  }

  @Override
  public void merge(UnifiedClientStats obj)
  {
    if (!_enabled.get()) return;
    Lock writeLock = acquireWriteLock();
    try
    {
      doMergeStats(obj);
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

  // called by StatsCollectors.mergeStatsCollectors()
  @Override
  public void resetAndMerge(List<UnifiedClientStats> objList)
  {
    Lock writeLock = acquireWriteLock();
    try
    {
      reset();
      for (UnifiedClientStats t: objList)
      {
        merge(t);
      }
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

  // "Rich" copy of default impl in AbstractMonitoringMBean:  never pass UnifiedClientStatsEvent,
  // only UnifiedClientStats (which contains UnifiedClientStatsEvent _event).  We need this so
  // we can merge the internal reservoir (array) objects efficiently; we _really_ don't want to
  // convert and copy them to pass around inside _event.  (Histograms are so fun...)
  @Override
  public void mergeStats(DatabusMonitoringMBean<UnifiedClientStatsEvent> other)
  {
    if (! (other instanceof UnifiedClientStats)) return;

    UnifiedClientStats otherObj = (UnifiedClientStats)other;

    Lock otherReadLock = otherObj.acquireReadLock();
    Lock thisWriteLock = null;
    try
    {
      thisWriteLock = acquireWriteLock(otherReadLock);
      doMergeStats(otherObj);
    }
    finally
    {
      releaseLock(thisWriteLock);
      releaseLock(otherReadLock);
    }
  }



  // We use "cur" (a.k.a. "current") instead of "num" to prevent MBeanSensorHelper from treating this
  // metric as an RRD counter.  (We want gauge instead.)
  @Override
  public int getCurBootstrappingPartitions()
  {
    int result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      result = _event.curBootstrappingPartitions;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  // We use "cur" (a.k.a. "current") instead of "num" to prevent MBeanSensorHelper from treating this
  // metric as an RRD counter.  (We want gauge instead.)
  @Override
  public int getCurDeadConnections()
  {
    int result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      // For the lowest-level (non-aggregated) stats, which we detect via register*() calls, this getter is the
      // most reasonable place to check whether we're dead.  But it may never be called (or only rarely), so we
      // can't rely on its calculation of curDeadConnections to compute the aggregated versions.  (And we
      // certainly can't depend on further register* calls if we're dead.)  Ergo, for aggregated versions, do a
      // separate calculation in doMergeStats().
      if (!_event.aggregated)
      {
        long timeIntervalSinceHeartbeatMs = System.currentTimeMillis() - _event.timestampOfLastHeartbeatMs;
        _event.curDeadConnections = (timeIntervalSinceHeartbeatMs > _deadnessThresholdMs) ? 1 : 0;
      }
      result = _event.curDeadConnections;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  @Override
  public long getNumConsumerErrors()
  {
    long result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      result = _event.numConsumerErrors;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  @Override
  public long getNumDataEvents()
  {
    long result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      result = _event.numDataEvents;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  // timeLagSourceToReceiptMs
  //  - this is for data events only => update in registerDataEventReceived()
  //  - value is _event.timestampLastDataEventWasReceivedMs - sourceTimestampOfLastEventReceivedMs
  //  - for getters:  if no events processed OR if bootstrap mode => all values are -1
  // timeLagSourceToReceiptMs_HistPct_50
  // timeLagSourceToReceiptMs_HistPct_90	// these are official "reserved suffixes" per
  // timeLagSourceToReceiptMs_HistPct_95	// https://iwww.corp.linkedin.com/wiki/cf/display/SOP/Getting+Started+with+Autometrics
  // timeLagSourceToReceiptMs_HistPct_99	// https://iwww.corp.linkedin.com/wiki/cf/display/SOP/Metrics+Naming+Policy
  @Override
  public double getTimeLagSourceToReceiptMs_HistPct_50()
  {
    return getTimeLagSourceToReceiptMs_HistPct(50.0);
  }

  @Override
  public double getTimeLagSourceToReceiptMs_HistPct_90()
  {
    return getTimeLagSourceToReceiptMs_HistPct(90.0);
  }

  @Override
  public double getTimeLagSourceToReceiptMs_HistPct_95()
  {
    return getTimeLagSourceToReceiptMs_HistPct(95.0);
  }

  @Override
  public double getTimeLagSourceToReceiptMs_HistPct_99()
  {
    return getTimeLagSourceToReceiptMs_HistPct(99.0);
  }

  private double getTimeLagSourceToReceiptMs_HistPct(double percentile)
  {
    double result = (double)AbstractMonitoringMBean.DEFAULT_MIN_LONG_VALUE;  // -1.0
    Lock readLock = acquireReadLock();
    try
    {
      // If we're an aggregate, _isBootstrapping should never be true, but all of our constituents
      // could be bootstrapping, in which case their reservoirs will all be null and ours will be
      // empty.  Alternatively, some constituents might be in online consumption, but if they haven't
      // yet received any data events, their reservoirs will be empty, and so will ours.  In either
      // case, we return -1.0 as a special value.
      if (!_isBootstrapping && _event.timestampLastDataEventWasReceivedMs > 0 &&
          _reservoirTimeLagSourceToReceiptMs != null && _reservoirTimeLagSourceToReceiptMs.size() > 0)
      {
        double[] dataValues = _reservoirTimeLagSourceToReceiptMs.getUnsortedValues();
        if (dataValues.length > 0)  // percentile() returns Double.NaN for empty arrays, but we want -1.0
        {
          result = StatUtils.percentile(dataValues, percentile);
        }
      }
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  // timeLagLastReceivedToNowMs
  // [want max time lag over whatever is being aggregated, i.e., track the _minimum_ (oldest) timestamp]
  @Override
  public long getTimeLagLastReceivedToNowMs()
  {
    long result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      result = (_event.timestampLastDataEventWasReceivedMs != 0) ?
          System.currentTimeMillis() - _event.timestampLastDataEventWasReceivedMs : -1;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  // timeLagConsumerCallbacksMs
  //  - this is for ALL callbacks => update in registerCallbacksProcessed()
  //  - value is timeElapsedNs / DbusConstants.NUM_NSECS_IN_MSEC == timeElapsedMs
  //  - for getters:  if no events processed => all values are zero; no special handling for bootstraps
  // timeLagConsumerCallbacksMs_HistPct_50
  // timeLagConsumerCallbacksMs_HistPct_90	// these are official "reserved suffixes" per
  // timeLagConsumerCallbacksMs_HistPct_95	// https://iwww.corp.linkedin.com/wiki/cf/display/SOP/Getting+Started+with+Autometrics
  // timeLagConsumerCallbacksMs_HistPct_99	// https://iwww.corp.linkedin.com/wiki/cf/display/SOP/Metrics+Naming+Policy
  // timeLagConsumerCallbacksMs_Max
  @Override
  public double getTimeLagConsumerCallbacksMs_HistPct_50()
  {
    return getTimeLagConsumerCallbacksMs_HistPct(50.0);
  }

  @Override
  public double getTimeLagConsumerCallbacksMs_HistPct_90()
  {
    return getTimeLagConsumerCallbacksMs_HistPct(90.0);
  }

  @Override
  public double getTimeLagConsumerCallbacksMs_HistPct_95()
  {
    return getTimeLagConsumerCallbacksMs_HistPct(95.0);
  }

  @Override
  public double getTimeLagConsumerCallbacksMs_HistPct_99()
  {
    return getTimeLagConsumerCallbacksMs_HistPct(99.0);
  }

  // integer milliseconds would suffice for this metric, but Apache Commons Math's stats methods require doubles
  private double getTimeLagConsumerCallbacksMs_HistPct(double percentile)
  {
    double result = (double)AbstractMonitoringMBean.DEFAULT_MIN_LONG_VALUE;  // -1.0
    Lock readLock = acquireReadLock();
    try
    {
      if (_reservoirTimeLagConsumerCallbacksMs.size() > 0)
      {
        double[] dataValues = _reservoirTimeLagConsumerCallbacksMs.getUnsortedValues();
        result = StatUtils.percentile(dataValues, percentile);
      }
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  @Override
  public double getTimeLagConsumerCallbacksMs_Max()
  {
    double result = (double)AbstractMonitoringMBean.DEFAULT_MIN_LONG_VALUE;  // -1.0
    Lock readLock = acquireReadLock();
    try
    {
      if (_reservoirTimeLagConsumerCallbacksMs.size() > 0)
      {
        double[] dataValues = _reservoirTimeLagConsumerCallbacksMs.getUnsortedValues();
        result = StatUtils.max(dataValues);
      }
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  @Override
  public long getTimeSinceLastResetMs()
  {
    long result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      result = System.currentTimeMillis() - _event.timestampLastResetMs;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  @Override
  public long getTimestampLastResetMs()
  {
    long result = 0;
    Lock readLock = acquireReadLock();
    try
    {
      result = _event.timestampLastResetMs;
    }
    finally
    {
      releaseLock(readLock);
    }
    return result;
  }

  //----------------------------- "EVENTS RECEIVED" CALLS -----------------------------
  // ("received" by client lib from relay or bootstrap but not yet passed to consumer)

  // called only by MultiConsumerCallback => only for lowest-level (non-aggregated) stats
  public void registerDataEventReceived(DbusEvent e)
  {
    if (!_enabled.get()) return;
    Lock writeLock = acquireWriteLock();
    try
    {
      ++_event.numDataEvents;
      _event.timestampLastDataEventWasReceivedMs = System.currentTimeMillis();
      // if we're bootstrapping, we'll return -1 regardless, so no need to waste time storing data
      if (!_isBootstrapping)
      {
        // not bootstrapping, so _reservoirTimeLagSourceToReceiptMs shouldn't be null
        final long sourceTimestampOfLastEventReceivedMs = e.timestampInNanos() / DbusConstants.NUM_NSECS_IN_MSEC;
        _reservoirTimeLagSourceToReceiptMs.update(
            _event.timestampLastDataEventWasReceivedMs - sourceTimestampOfLastEventReceivedMs,
            _event.timestampLastDataEventWasReceivedMs / DbusConstants.NUM_MSECS_IN_SEC);
      }
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

  //----------------------------- "EVENTS PROCESSED" CALLS -----------------------------
  // ("processed" by consumer, i.e., output side of client lib)

  // called by *ConsumerCallbackFactory
  public void registerCallbacksProcessed(long timeElapsedNs)  // note that ConsumerCallbackStats version uses ms
  {
    if (!_enabled.get()) return;
    Lock writeLock = acquireWriteLock();
    try
    {
      _reservoirTimeLagConsumerCallbacksMs.update((double)timeElapsedNs / DbusConstants.NUM_NSECS_IN_MSEC);
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

  //----------------------------- "ERRORS PROCESSED" CALLS -----------------------------

  // was registerErrorEventsProcessed()
  public void registerCallbackError()
  {
    if (!_enabled.get()) return;
    Lock writeLock = acquireWriteLock();
    try
    {
      ++_event.numConsumerErrors;
    }
    finally
    {
      releaseLock(writeLock);
    }
  }

}
